Sample-Code-Snippets/Python/Azure Function Example/simple_receive.py (32 lines of code) (raw):
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
async def on_event(partition_context, event):
# Print the event data.
print(f'Received an event: {event.body_as_str(encoding="UTF-8")} from the partition with ID: "{partition_context.partition_id}"')
await partition_context.update_checkpoint(event)
async def on_partition_initialize(partition_context):
print(f"Partition: {partition_context.partition_id} has been initialized.")
async def on_partition_close(partition_context, reason):
print(f"Partition: {partition_context.partition_id} has been closed for {reason}.")
async def on_error(partition_context, error):
print(f"Partition: {partition_context.partition_id} has errored. {error}")
async def main():
event_hub_right_path = 'right_eh'
event_hub_left_path = 'left_eh'
right_cg = "rcg1"
left_cg = "lcg1"
# Create a consumer client for the event hub.
consumer = EventHubConsumerClient.from_connection_string(
conn_str="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
eventhub_name=event_hub_left_path,
consumer_group=left_cg)
async with consumer:
await consumer.receive(
on_event=on_event,
on_error=on_error,
on_partition_close=on_partition_close,
on_partition_initialize=on_partition_initialize,
starting_position="-1"
)
if __name__ == "__main__":
# Run the main method.
asyncio.run(main())